package com.duoduo.cannales.config;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.duoduo.cannales.dao.UserDao;
import com.duoduo.cannales.entity.User;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * cannal 处理mysql bin-log数据
 *
 * @author qian
 * @version 1.0
 * @date 2021/3/8 16:19
 */
@Slf4j
@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Resource
    private UserDao userMapper;

    @Resource
    private RestHighLevelClient restHighLevelClient;

    @Resource
    private CanalConnector canalConnector;

    @Scheduled(fixedDelay = 100) //每隔100秒执行
    @Override
    public void run() {
        long batchId = -1;
        try {
            //每次拉取条数
            int batchSize = 1000;
            Message message = canalConnector.getWithoutAck(batchSize);
            //批次id
            batchId = message.getId();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (batchId != -1 && entries.size() > 0) {
                entries.forEach(entry -> {
                    //MySQL种my.cnf中配置的是binlog_format = ROW，这里只解析ROW类型
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        //解析处理
                        publishCanalEvent(entry);
                    }
                });
            }
            canalConnector.ack(batchId);
        } catch (Exception e) {
            e.printStackTrace();
            canalConnector.rollback(batchId);
        }
    }

    private void publishCanalEvent(CanalEntry.Entry entry) {
        // CanalEntry.EntryType entryType = entry.getEntryType();
        //表名
        String tableName = entry.getHeader().getTableName();
        //数据库名
        String database = entry.getHeader().getSchemaName();
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return;
        }
        rowChange.getRowDatasList().forEach(rowData -> {
            //这里也可以获取改变前的数据
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            beforeColumnsList.stream().forEach(column -> {
                log.info("更改前的数据:name:{},value:{}", column.getName(), column.getValue());
            });
            //获取改变后的数据
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
           /* String primaryKey = "id";
            CanalEntry.Column idColumn = afterColumnsList.stream().filter(column ->
                    column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);*/
            Map<String, Object> columnsToMap = parseColumnsToMap(afterColumnsList);
            try {
                //插入es
                indexES(columnsToMap, database, tableName);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> map = new HashMap<>();
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            log.info("更改后的数据:name:{},value:{}", column.getName(), column.getValue());
            map.put(column.getName(), column.getValue());
        });
        return map;
    }

    /**
     * ps:
     * 1. 问题1：异常：java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
     * 如果使用下面写法：
     * User user = userMapper.selectById(new Integer((String) dataMap.get("id")));
     * .....
     * indexRequest.source(user);
     * 所以这里我改成使用indexRequest.source(map)，使用map;
     * <p>
     * 2.问题2：异常：cannot write xcontent for unknown value of type class java.sql.Timestamp
     * QueryWrapper<User> queryWrapper = new QueryWrapper<>();
     * queryWrapper.ge("id", new Integer((String) dataMap.get("id")));
     * List<Map<String, Object>> maps = userMapper.selectMaps(queryWrapper);
     * for (Map<String, Object> map : maps) {
     * IndexRequest indexRequest = new IndexRequest();
     * indexRequest.id(String.valueOf(map.get("id")));
     * indexRequest.source(map);
     * restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
     * }
     * User对象中，时间为Date类型
     * 调用此代码：List<Map<String, Object>> maps = userMapper.selectMaps(queryWrapper);
     * maps中的updateAt、createAt两字段不是实体类中定义的date类型成了java.sql.Timestamp类型
     * es7.3.2无法处理Timestamp类型，因此这里修改写法，正确的写法在下面代码中
     * <p>
     * 问题3：
     * 异常：Found interface org.elasticsearch.common.bytes.BytesReference, but class was expected
     * 控制台输出：
     * java.lang.IncompatibleClassChangeError: Found interface org.elasticsearch.common.bytes.BytesReference, but class was expected
     * at org.elasticsearch.client.RequestConverters.index(RequestConverters.java:340) ~[elasticsearch-rest-high-level-client-7.4.2.jar:7.6.2]
     * at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1450) ~[elasticsearch-rest-high-level-client-7.4.2.jar:7.6.2]
     * at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424) ~[elasticsearch-rest-high-level-client-7.4.2.jar:7.6.2]
     * 原因：我个人猜想：可能是版本问题
     * 在spring boot2.3.0.RELEASE版本中,可以查看es的依赖版本是7.6.2，而我elasticsearch-rest-high-level-client版本是7.4.2，与我安装的es版本一致
     * 因此我在这里修改spring boot2.3.0.RELEASE版本中默认提供的es版本，在pom.xml中增加如下部分：
     * <properties>
     * <elasticsearch.version>7.4.2</elasticsearch.version>
     * </properties>
     * 问题即可解决。
     */
    private void indexES(Map<String, Object> dataMap, String database, String table) throws IOException {
        log.info("dataMap:{},database:{},table:{}", dataMap, database, table);
        //不是“dianping”库中的，不处理
        if (!StringUtils.equals("test", database)) {
            return;
        }
        //不是user表中的数据不处理
        if (StringUtils.equals("user", table)) {
            //利用jpa 根据id查询出数据，并将其转化成map
            User user = userMapper.findById(new Integer((String) dataMap.get("id"))).get();
            Map<String, Object> map = JSON.parseObject(JSON.toJSONString(user, SerializerFeature.WriteNullStringAsEmpty,
                    SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteMapNullValue), Map.class);
            IndexRequest indexRequest = new IndexRequest("user");
            indexRequest.id(String.valueOf(map.get("id")));
            indexRequest.source(map);
            restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        } else {
            return;
        }
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
